package pfq.demo.rx;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

/**
 * 演示RxJava2的背压
 * 啥叫背压：在异步场景中，被观察者发送事件的速度远远快于观察者处理事件的速度的情况下，一种告诉上游的被观察者降低发送速度的策略
 */
public class Backpressure {
    public static void main(String[] args) {

        // Observable不支持背压
        Observable.range(0, 10).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Observable onSubscribe start");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("Observable onNext：" + integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

        // Flowable支持背压
        Flowable.range(0, 10)
                .subscribe(new Subscriber<Integer>() {
                    Subscription subscription;

                    @Override
                    public void onSubscribe(Subscription s) {
                        System.out.println("Flowable onSubscribe start");
                        subscription = s;
                        subscription.request(1);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("Flowable onNext：" + integer);

                        // 下游调用request(n)来告诉上游发送多少个数据
                        // 同时，subscription也可用于取消观察，跟Observer中的Disposable类似，只是命名不一样而已
                        subscription.request(1);
                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

    }
}
